{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "# Pipelines using Dask, Kubeflow and MLRun"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Create a project to host functions, jobs and artifacts\n",
    "\n",
    "Projects are used to package multiple functions, workflows, and artifacts. Project code and definitions are usually stored in a Git archive.\n",
    "\n",
    "The following code creates a new project in a local dir and initializes git tracking on it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-09-27 17:26:14,808 [info] loaded project sk-project-dask from MLRun DB\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "import mlrun\n",
    "import warnings\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")\n",
    "\n",
    "# set project name and dir\n",
    "project_name = \"sk-project-dask\"\n",
    "project_dir = \"./\"\n",
    "\n",
    "# specify artifacts target location\n",
    "_, artifact_path = mlrun.set_environment(artifact_path=path)\n",
    "\n",
    "# set project\n",
    "sk_dask_proj = mlrun.get_or_create_project(project_name, project_dir, init_git=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Init Dask cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import mlrun\n",
    "\n",
    "# set up function from local file\n",
    "dsf = mlrun.new_function(name=\"mydask\", kind=\"dask\", image=\"mlrun/ml-models\")\n",
    "\n",
    "# set up function specs for dask\n",
    "dsf.spec.remote = True\n",
    "dsf.spec.replicas = 5\n",
    "dsf.spec.service_type = \"NodePort\"\n",
    "dsf.with_limits(mem=\"6G\")\n",
    "dsf.spec.nthreads = 5"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<mlrun.runtimes.daskjob.DaskCluster at 0x7f47fce9c850>"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# apply mount_v3io over the function so that the k8s pod that runs the function\n",
    "# can access the data (shared data access)\n",
    "dsf.apply(mlrun.mount_v3io())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'db://sk-project-dask/mydask'"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dsf.save()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-09-27 17:26:25,134 [info] trying dask client at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786\n",
      "> 2022-09-27 17:26:25,162 [info] using remote dask scheduler (mlrun-mydask-d7df9301-d) at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.alexp-edge.lab.iguazeng.com:32472/status\" target=\"_blank\" >dashboard link: default-tenant.app.alexp-edge.lab.iguazeng.com:32472</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "    <div style=\"width: 24px; height: 24px; background-color: #e1e1e1; border: 3px solid #9D9D9D; border-radius: 5px; position: absolute;\"> </div>\n",
       "    <div style=\"margin-left: 48px;\">\n",
       "        <h3 style=\"margin-bottom: 0px;\">Client</h3>\n",
       "        <p style=\"color: #9D9D9D; margin-bottom: 0px;\">Client-83392da2-3e89-11ed-b7e8-82a5d7054c46</p>\n",
       "        <table style=\"width: 100%; text-align: left;\">\n",
       "\n",
       "        <tr>\n",
       "        \n",
       "            <td style=\"text-align: left;\"><strong>Connection method:</strong> Direct</td>\n",
       "            <td style=\"text-align: left;\"></td>\n",
       "        \n",
       "        </tr>\n",
       "\n",
       "        \n",
       "            <tr>\n",
       "                <td style=\"text-align: left;\">\n",
       "                    <strong>Dashboard: </strong> <a href=\"http://mlrun-mydask-d7df9301-d.default-tenant:8787/status\" target=\"_blank\">http://mlrun-mydask-d7df9301-d.default-tenant:8787/status</a>\n",
       "                </td>\n",
       "                <td style=\"text-align: left;\"></td>\n",
       "            </tr>\n",
       "        \n",
       "\n",
       "        </table>\n",
       "\n",
       "        \n",
       "            <details>\n",
       "            <summary style=\"margin-bottom: 20px;\"><h3 style=\"display: inline;\">Scheduler Info</h3></summary>\n",
       "            <div style=\"\">\n",
       "    <div>\n",
       "        <div style=\"width: 24px; height: 24px; background-color: #FFF7E5; border: 3px solid #FF6132; border-radius: 5px; position: absolute;\"> </div>\n",
       "        <div style=\"margin-left: 48px;\">\n",
       "            <h3 style=\"margin-bottom: 0px;\">Scheduler</h3>\n",
       "            <p style=\"color: #9D9D9D; margin-bottom: 0px;\">Scheduler-b8468d53-b900-4041-9982-5e14d5e5eb81</p>\n",
       "            <table style=\"width: 100%; text-align: left;\">\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Comm:</strong> tcp://10.200.152.178:8786\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Workers:</strong> 0\n",
       "                    </td>\n",
       "                </tr>\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Dashboard:</strong> <a href=\"http://10.200.152.178:8787/status\" target=\"_blank\">http://10.200.152.178:8787/status</a>\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Total threads:</strong> 0\n",
       "                    </td>\n",
       "                </tr>\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Started:</strong> Just now\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Total memory:</strong> 0 B\n",
       "                    </td>\n",
       "                </tr>\n",
       "            </table>\n",
       "        </div>\n",
       "    </div>\n",
       "\n",
       "    <details style=\"margin-left: 48px;\">\n",
       "        <summary style=\"margin-bottom: 20px;\">\n",
       "            <h3 style=\"display: inline;\">Workers</h3>\n",
       "        </summary>\n",
       "\n",
       "        \n",
       "\n",
       "    </details>\n",
       "</div>\n",
       "            </details>\n",
       "        \n",
       "\n",
       "    </div>\n",
       "</div>"
      ],
      "text/plain": [
       "<Client: 'tcp://10.200.152.178:8786' processes=0 threads=0, memory=0 B>"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# init dask cluster\n",
    "dsf.client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Load and run a functions\n",
    "\n",
    "Load the function object from .py or .yaml file, or the Function Hub (marketplace).<br>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f48353d5130>"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# load function from the Function Hub\n",
    "sk_dask_proj.set_function(\"hub://describe\", name=\"describe\")\n",
    "sk_dask_proj.set_function(\"hub://sklearn_classifier_dask\", name=\"dask_classifier\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Create a fully automated ML pipeline\n",
    "\n",
    "### Add more functions to the project to be used in the pipeline (from the Function Hub)\n",
    "\n",
    "Describe data, train and eval model with dask."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "### Define and save a pipeline\n",
    "\n",
    "The following workflow definition is written into a file. It describes a Kubeflow execution graph (DAG) \n",
    "and how functions and data are connected to form an end-to-end pipeline. \n",
    "\n",
    "* Describe data.\n",
    "* Train, test and evaluate with dask.\n",
    "\n",
    "Check the code below to see how functions objects are initialized and used (by name) inside the workflow.<br>\n",
    "The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).\n",
    "\n",
    "> Note: The pipeline can include CI steps like building container images and deploying models as illustrated in the following example.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting workflow.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile workflow.py\n",
    "import os\n",
    "from kfp import dsl\n",
    "import mlrun\n",
    "\n",
    "# params\n",
    "funcs = {}\n",
    "LABELS = \"label\"\n",
    "DROP = \"congestion_surcharge\"\n",
    "DATA_URL = mlrun.get_sample_path(\"data/iris/iris_dataset.csv\")\n",
    "DASK_CLIENT = \"db://sk-project-dask/mydask\"\n",
    "\n",
    "\n",
    "# init functions are used to configure function resources and local settings\n",
    "def init_functions(functions: dict, project=None, secrets=None):\n",
    "    for f in functions.values():\n",
    "        f.apply(mlrun.mount_v3io())\n",
    "        pass\n",
    "\n",
    "\n",
    "@dsl.pipeline(name=\"Demo training pipeline\", description=\"Shows how to use mlrun\")\n",
    "def kfpipeline():\n",
    "    # Describe the data\n",
    "    describe = funcs[\"describe\"].as_step(\n",
    "        inputs={\"table\": DATA_URL},\n",
    "        params={\"dask_function\": DASK_CLIENT},\n",
    "    )\n",
    "\n",
    "    # Train, test and evaluate:\n",
    "    train = funcs[\"dask_classifier\"].as_step(\n",
    "        name=\"train\",\n",
    "        handler=\"train_model\",\n",
    "        inputs={\"dataset\": DATA_URL},\n",
    "        params={\n",
    "            \"label_column\": LABELS,\n",
    "            \"dask_function\": DASK_CLIENT,\n",
    "            \"test_size\": 0.10,\n",
    "            \"model_pkg_class\": \"sklearn.ensemble.RandomForestClassifier\",\n",
    "            \"drop_cols\": DROP,\n",
    "        },\n",
    "        outputs=[\"model\", \"test_set\"],\n",
    "    )\n",
    "    train.after(describe)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# register the workflow file as \"main\", embed the workflow code into the project YAML\n",
    "sk_dask_proj.set_workflow(\"main\", \"workflow.py\", embed=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Save the project definitions to a file (project.yaml). It is recommended to commit all changes to a Git repo."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<mlrun.projects.project.MlrunProject at 0x7f48342e4880>"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sk_dask_proj.save()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "<a id='run-pipeline'></a>\n",
    "## Run a pipeline workflow\n",
    "Use the `run` method to execute a workflow. You can provide alternative arguments and specify the default target for workflow artifacts.<br>\n",
    "The workflow ID is returned and can be used to track the progress or you can use the hyperlinks.\n",
    "\n",
    "> Note: The same command can be issued through CLI commands:<br>\n",
    "    `mlrun project my-proj/ -r main -p \"v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/\"`\n",
    "\n",
    "The `dirty` flag lets you run a project with uncommitted changes (when the notebook is in the same git dir it is always dirty).<br>\n",
    "The `watch` flag waits for the pipeline to complete and print results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>Pipeline running (id=631ad0a3-19f1-4df0-bfa7-6c38c60275e0), <a href=\"https://dashboard.default-tenant.app.alexp-edge.lab.iguazeng.com/mlprojects/sk-project-dask/jobs/monitor-workflows/workflow/631ad0a3-19f1-4df0-bfa7-6c38c60275e0\" target=\"_blank\"><b>click here</b></a> to view the details in MLRun UI</div>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.43.0 (0)\n",
       " -->\n",
       "<!-- Title: kfp Pages: 1 -->\n",
       "<svg width=\"108pt\" height=\"116pt\"\n",
       " viewBox=\"0.00 0.00 108.09 116.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 112)\">\n",
       "<title>kfp</title>\n",
       "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-112 104.09,-112 104.09,4 -4,4\"/>\n",
       "<!-- demo&#45;training&#45;pipeline&#45;48mrn&#45;2447047633 -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>demo&#45;training&#45;pipeline&#45;48mrn&#45;2447047633</title>\n",
       "<ellipse fill=\"green\" stroke=\"black\" cx=\"50.05\" cy=\"-90\" rx=\"50.09\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"50.05\" y=\"-86.3\" font-family=\"Times,serif\" font-size=\"14.00\">describe</text>\n",
       "</g>\n",
       "<!-- demo&#45;training&#45;pipeline&#45;48mrn&#45;366652794 -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>demo&#45;training&#45;pipeline&#45;48mrn&#45;366652794</title>\n",
       "<ellipse fill=\"green\" stroke=\"black\" cx=\"50.05\" cy=\"-18\" rx=\"33.29\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"50.05\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\">train</text>\n",
       "</g>\n",
       "<!-- demo&#45;training&#45;pipeline&#45;48mrn&#45;2447047633&#45;&gt;demo&#45;training&#45;pipeline&#45;48mrn&#45;366652794 -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>demo&#45;training&#45;pipeline&#45;48mrn&#45;2447047633&#45;&gt;demo&#45;training&#45;pipeline&#45;48mrn&#45;366652794</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M50.05,-71.7C50.05,-63.98 50.05,-54.71 50.05,-46.11\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"53.55,-46.1 50.05,-36.1 46.55,-46.1 53.55,-46.1\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.graphs.Digraph at 0x7f47fce02a90>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<h2>Run Results</h2>Workflow 631ad0a3-19f1-4df0-bfa7-6c38c60275e0 finished, state=Succeeded<br>click the hyper links below to see detailed results<br><table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th>uid</th>\n",
       "      <th>start</th>\n",
       "      <th>state</th>\n",
       "      <th>name</th>\n",
       "      <th>parameters</th>\n",
       "      <th>results</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <td><div title=\"c3dd1c89ad79458596f90f1b25cd95b4\"><a href=\"https://dashboard.default-tenant.app.alexp-edge.lab.iguazeng.com/mlprojects/sk-project-dask/jobs/monitor/c3dd1c89ad79458596f90f1b25cd95b4/overview\" target=\"_blank\" >...25cd95b4</a></div></td>\n",
       "      <td>Sep 27 17:27:09</td>\n",
       "      <td>completed</td>\n",
       "      <td>train</td>\n",
       "      <td><div class=\"dictlist\">label_column=label</div><div class=\"dictlist\">dask_function=db://sk-project-dask/mydask</div><div class=\"dictlist\">test_size=0.1</div><div class=\"dictlist\">model_pkg_class=sklearn.ensemble.RandomForestClassifier</div><div class=\"dictlist\">drop_cols=congestion_surcharge</div></td>\n",
       "      <td><div class=\"dictlist\">micro=0.9944598337950138</div><div class=\"dictlist\">macro=0.9945823158323159</div><div class=\"dictlist\">precision-0=1.0</div><div class=\"dictlist\">precision-1=0.9166666666666666</div><div class=\"dictlist\">precision-2=0.8</div><div class=\"dictlist\">recall-0=1.0</div><div class=\"dictlist\">recall-1=0.7857142857142857</div><div class=\"dictlist\">recall-2=0.9230769230769231</div><div class=\"dictlist\">f1-0=1.0</div><div class=\"dictlist\">f1-1=0.8461538461538461</div><div class=\"dictlist\">f1-2=0.8571428571428571</div></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <td><div title=\"a5b1e363377e4df4b88bb14e6516a656\"><a href=\"https://dashboard.default-tenant.app.alexp-edge.lab.iguazeng.com/mlprojects/sk-project-dask/jobs/monitor/a5b1e363377e4df4b88bb14e6516a656/overview\" target=\"_blank\" >...6516a656</a></div></td>\n",
       "      <td>Sep 27 17:26:42</td>\n",
       "      <td>completed</td>\n",
       "      <td>describe</td>\n",
       "      <td><div class=\"dictlist\">dask_function=db://sk-project-dask/mydask</div></td>\n",
       "      <td></td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "artifact_path = os.path.abspath(\"./pipe/{{workflow.uid}}\")\n",
    "run_id = sk_dask_proj.run(\n",
    "    \"main\", arguments={}, artifact_path=artifact_path, dirty=False, watch=True\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "**[back to top](#top)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
